package com.hanxiaozhang.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import org.junit.Test;

import java.net.InetSocketAddress;

/**
 * 功能描述: <br>
 * 〈Netty初级〉
 * <p>
 * 目的：前边 NIO 逻辑 --> 依托着前面的思维逻辑
 * NIO --> Channel  ByteBuffer  Selector
 * Netty中，将ByteBuffer封装成ByteBuf（这里还有一个池化的概念，【pool】）
 *
 * @Author:hanxinghua
 * @Date: 2021/8/26
 */
public class BasicNettyTest {


    @Test
    public void byteBufTest() {

//        // 分配方式一:
//        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(8, 20);

//        // 分配方式二：非池化
//        ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);

        // 分配方式三：池化
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);

        print(buf);

        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);
        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);
        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);
        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);
        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);
        // 溢出
//        buf.writeBytes(new byte[]{1, 2, 3, 4});
//        print(buf);

    }

    /**
     * 打印Buf差异
     *
     * @param buf
     */
    private static void print(ByteBuf buf) {


        System.out.println("buf.isReadable()[是否可读]: " + buf.isReadable());
        System.out.println("buf.readerIndex()        : " + buf.readerIndex());
        System.out.println("buf.readableBytes()[可读字的节数] : " + buf.readableBytes());
        System.out.println("buf.isWritable()[是否可写] : " + buf.isWritable());
        System.out.println("buf.writerIndex()         : " + buf.writerIndex());
        System.out.println("buf.writableBytes()[可写的字节数] : " + buf.writableBytes());
        System.out.println("buf.capacity()[初始容量]   : " + buf.capacity());
        System.out.println("buf.maxCapacity()[最大容量]: " + buf.maxCapacity());
        System.out.println("buf.isDirect()[是否直接分配]: " + buf.isDirect());
        System.out.println("--------------");
    }

    // ----------------------------客户端-------------------------------

    /**
     *  客户端，连接别人
     *  1. 主动发送数据
     *  2. 别人什么时候给我发？  event  selector
     */

    /**
     * NioEventLoopGroup：
     * 它也是一个执行器，可以理解成线程池
     *
     * @throws Exception
     */
    @Test
    public void loopExecutor() throws Exception {
        // group是一个线程池的概念
        NioEventLoopGroup selector = new NioEventLoopGroup(2);
        selector.execute(() -> {
            try {
                for (; ; ) {
                    System.out.println("hello world001");
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        selector.execute(() -> {
            try {
                for (; ; ) {
                    System.out.println("hello world002");
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });


        System.in.read();
    }


    /**
     *  客户端--复杂写法
     *
     *
     * @throws Exception
     */
    @Test
    public void clientModeComplex() throws Exception {

        NioEventLoopGroup thread = new NioEventLoopGroup(1);
        // 客户端模式
        NioSocketChannel client = new NioSocketChannel();
        // epoll_ctl(5,ADD,3)
        thread.register(client);

        // 响应式
        ChannelPipeline pipeline = client.pipeline();
        pipeline.addLast(new MyInHandler());

        // reactor  异步的特征
        ChannelFuture connect = client.connect(new InetSocketAddress("192.168.150.11", 9090));
        ChannelFuture sync = connect.sync();

        ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());
        ChannelFuture send = client.writeAndFlush(buf);
        send.sync();

        sync.channel().closeFuture().sync();

        System.out.println("client over....");

    }


    /**
     * 客户端--简单写法
     *
     * @throws InterruptedException
     */
    @Test
    public void clientModeSimple() throws InterruptedException {

        NioEventLoopGroup group = new NioEventLoopGroup(1);
        Bootstrap bs = new Bootstrap();
        ChannelFuture connect = bs.group(group)
                .channel(NioSocketChannel.class)
                // 方式一：
//                .handler(new ChannelInit())
                // 方式二：
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new MyInHandler());
                    }
                })
                .connect(new InetSocketAddress("192.168.150.11", 9090));

        Channel client = connect.sync().channel();

        ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());
        ChannelFuture send = client.writeAndFlush(buf);
        send.sync();

        client.closeFuture().sync();

    }

    // ----------------------------服务端-------------------------------


    /**
     * 服务端--复杂写法
     *
     * @throws Exception
     */
    @Test
    public void serverModeComplex() throws Exception {

        NioEventLoopGroup thread = new NioEventLoopGroup(1);
        NioServerSocketChannel server = new NioServerSocketChannel();
        thread.register(server);

        // 响应式
        ChannelPipeline pipeline = server.pipeline();
        // accept接收客户端，并且注册到selector
//        //  版本一,单例：
//        pipeline.addLast(new MyAcceptHandler(thread,new MyInHandler()));
        //  版本二,连接多个客户端：
        pipeline.addLast(new MyAcceptHandler(thread, new ChannelInitHandler()));
        ChannelFuture bind = server.bind(new InetSocketAddress("192.168.150.1", 9090));


        bind.sync().channel().closeFuture().sync();
        System.out.println("server close....");


    }

    /**
     * 服务端--简单写法
     *
     * @throws InterruptedException
     */
    @Test
    public void serverModeSimple() throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        ServerBootstrap bs = new ServerBootstrap();
        ChannelFuture bind = bs.group(group, group)
                .channel(NioServerSocketChannel.class)
//                .childHandler(new ChannelInit())
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new MyInHandler());
                    }
                })
                .bind(new InetSocketAddress("192.168.150.1", 9090));

        bind.sync().channel().closeFuture().sync();

    }


}


/**
 * 接收Handler
 */
class MyAcceptHandler extends ChannelInboundHandlerAdapter {


    private final EventLoopGroup selector;
    private final ChannelHandler handler;

    public MyAcceptHandler(EventLoopGroup thread, ChannelHandler handler) {
        this.selector = thread;
        this.handler = handler;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server registerd...");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //  -- listen的socket，它只能accept
        //  -- 普通（连接）的socket，它是要R/W
        SocketChannel client = (SocketChannel) msg;

        // 响应式的  handler
        ChannelPipeline p = client.pipeline();
        // <#--#> 1. client::pipeline[ChannelInit,]
        p.addLast(handler);

        // 注册
        selector.register(client);

    }
}




/**
 * 为啥要有一个ChannelInitHandler，可以没有，
 * 但是MyInHandler就得设计成单例，即加上@ChannelHandler.Sharable
 * 它就是一个过桥
 */
@ChannelHandler.Sharable
class ChannelInitHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        Channel client = ctx.channel();
        ChannelPipeline pipeline = client.pipeline();
        // <#--#>  2. client::pipeline[ChannelInit,MyInHandler]
        pipeline.addLast(new MyInHandler());
        ctx.pipeline().remove(this);
        // <#--#> 3. client::pipeline[MyInHandler]
    }

    /**
     * ctx.pipeline().remove(this) 之后不会执行这个方法
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("haha");
        super.channelRead(ctx, msg);
    }


}


/**
 * 读写数据Handler就用户自己实现的，不应该用@ChannelHandler.Sharable (单例)
 * 不应该被强压给开发者。
 */
class MyInHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client  registed...");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client active...");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        //  readCharSequence 与 getCharSequence的区别：
        // CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);
        CharSequence str = buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);
        System.out.println(str);
        ctx.writeAndFlush(buf);
    }

}